Skip to content

Prevent watcher producer skip propagating to downstream tasks via trigger_rule#2591

Closed
tatiana wants to merge 26 commits into
mainfrom
downstream-skip-issue
Closed

Prevent watcher producer skip propagating to downstream tasks via trigger_rule#2591
tatiana wants to merge 26 commits into
mainfrom
downstream-skip-issue

Conversation

@tatiana
Copy link
Copy Markdown
Collaborator

@tatiana tatiana commented Apr 22, 2026

When using ExecutionMode.WATCHER with DbtTaskGroup, the producer task may be skipped on retry (via AirflowSkipException) since #2559. Because the producer is a leaf task inside the TaskGroup, Airflow's default trigger_rule="all_success" causes any tasks downstream of the group to be skipped as well - even when all consumer tasks succeeded. This makes ExecutionMode.WATCHER behave differently from ExecutionMode.LOCAL when used with DbtTaskGroup.

This PR introduces an opt-in setting propagate_watcher_trigger_rule that overrides __rshift__ and set_downstream on DbtTaskGroup to automatically set trigger_rule="none_failed_min_one_success" on downstream tasks when in watcher mode. This ensures the producer's skip state does not propagate outside the task group, while still requiring at least one upstream task to succeed (preventing downstream tasks from running when the entire group is skipped, e.g. via ShortCircuit).

It is an alternative approach to It is an alternative approach to #2597.

This is an opt-in feature. Users enable it via:

export AIRFLOW__COSMOS__PROPAGATE_WATCHER_TRIGGER_RULE=True

Behaviour before and after

The newly introduced DAG example_watcher_downstream_not_skipped illustrates the problematic behaviour this PR aims to address.

Video before this change:

cosmos-1.14.1-downstream-tasks-before-fix.mp4

Video with AIRFLOW__COSMOS__PROPAGATE_WATCHER_TRIGGER_RULE enabled:

cosmos-1.14.1-downstream-tasks-after-fix.mp4

Why not set producer >> consumers inside the TaskGroup?

In DbtDag, we set producer >> consumers so the producer's skip does not propagate to downstream tasks. The root consumers use trigger_rule="always" so they start watching XCom immediately without waiting for the producer to complete.

In DbtTaskGroup, this approach does not work because setting trigger_rule="always" on consumers would cause them to run even when the user-defined upstream tasks of the task group have not yet completed, breaking the expected execution order.

Alternative approaches

  • Introduce a gateway task between the group and its downstream tasks: Watcher skip on producer failures #2430
  • Accept that ExecutionMode.LOCAL and ExecutionMode.WATCHER are not fully compatible and document that users should set trigger_rule="none_failed_min_one_success" manually

Limitations

  • Only works when the dependency is created from the DbtTaskGroup side (dbt_group >> task or dbt_group.set_downstream(task)). Does not work with task.set_upstream(dbt_group) or task << dbt_group, since Cosmos cannot intercept methods on tasks it does not control.
  • When enabled, overrides any user-defined trigger_rule on downstream tasks with "none_failed_min_one_success".

Unsolved problem: depends_on_past

This PR does not solve the problem: #2596.

PR #2430 also doesn't solve the depends_on_past race condition. The PR fixes the retry/skip mechanism (backup XCom, restore on retry, skip producer on second attempt), but it doesn't prevent Run 2's producer from starting while Run 1's consumers are still running their fallback dbt commands.

Closes #2594

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 22, 2026

Codecov Report

❌ Patch coverage is 70.83333% with 7 lines in your changes missing coverage. Please review.
✅ Project coverage is 97.96%. Comparing base (b218d33) to head (ceaae15).
⚠️ Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
cosmos/airflow/task_group.py 70.83% 7 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2591      +/-   ##
==========================================
- Coverage   98.04%   97.96%   -0.09%     
==========================================
  Files         104      104              
  Lines        7737     7761      +24     
==========================================
+ Hits         7586     7603      +17     
- Misses        151      158       +7     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

tatiana added 8 commits April 22, 2026 13:05
…er skips

it will work if: users do dbt_task_group >> downstream task
if users do downstream task << dbt_task_group
if users do dbt_group.set_downstream(downstream task)

It will not work if users do post_dbt.set_upstream(dbt_group)
@tatiana tatiana changed the title WIP: Prevent watcher producer skip from propagating to DbtTaskGroup downstream tasks Prevent DbtTaskGroup watcher producer skip from propagating downstream tasks Apr 22, 2026
@tatiana tatiana marked this pull request as ready for review April 22, 2026 12:32
Copilot AI review requested due to automatic review settings April 22, 2026 12:32
@tatiana tatiana requested review from a team, corsettigyg, dwreeves and jbandoro as code owners April 22, 2026 12:32
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces an opt-in mitigation for a Watcher-mode DbtTaskGroup behavior where the producer can be skipped on retry and (via Airflow’s default all_success) inadvertently cause tasks downstream of the TaskGroup to be skipped even when all consumer tasks succeeded.

Changes:

  • Adds propagate_watcher_trigger_rule config (and env var) to opt in to the new behavior.
  • Updates DbtTaskGroup dependency wiring (>>, <<, set_downstream) to set downstream tasks’ trigger_rule="none_failed" in watcher modes when the setting is enabled.
  • Adds integration tests + a new example DAG and dbt project to reproduce/validate the behavior.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
cosmos/airflow/task_group.py Overrides downstream wiring to apply trigger_rule="none_failed" to downstream tasks when watcher + opt-in setting.
cosmos/settings.py Introduces propagate_watcher_trigger_rule boolean setting.
tests/operators/test_watcher.py Adds integration tests covering default behavior vs opt-in behavior.
docs/reference/configs/cosmos-conf.rst Documents the new propagate_watcher_trigger_rule setting and limitations.
docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst Expands watcher retry docs and documents the new setting in the watcher guide.
dev/failed_dags/example_watcher_downstream_not_skipped.py Adds a reproduction DAG demonstrating downstream-skip behavior and the opt-in fix.
dev/dags/dbt/watcher_downstream_not_skipped/dbt_project.yml Adds a small dbt project used by tests/repro DAG.
dev/dags/dbt/watcher_downstream_not_skipped/models/model_a.sql Sample model for the repro dbt project.
dev/dags/dbt/watcher_downstream_not_skipped/models/model_retry.sql Model designed to fail once (via sequence pre-hook) to exercise watcher retry/skip flow.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread cosmos/airflow/task_group.py Outdated
Comment thread tests/operators/test_watcher.py Outdated
Comment thread tests/operators/test_watcher.py Outdated
Comment thread dev/failed_dags/example_watcher_downstream_not_skipped.py Outdated
…ped as

  TriggerRule enum in AF3 but accepts string values at runtime. This matches the
  pattern used elsewhere in Cosmos (e.g., graph.py).
https://github.com/astronomer/astronomer-cosmos/pull/2591\#discussion_r3124199904

  - test_rshift_sets_trigger_rule_on_downstream_task_group_roots — updated to use
  get_roots() instead of children
  - test_set_downstream_sets_trigger_rule — covers set_downstream() path
  - test_rlshift_sets_trigger_rule — covers << path (__rlshift__)
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread docs/reference/configs/cosmos-conf.rst Outdated
Comment thread docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst Outdated
Copilot AI review requested due to automatic review settings April 22, 2026 13:36
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread docs/reference/configs/cosmos-conf.rst Outdated
Comment thread tests/operators/test_watcher.py Outdated
Comment thread tests/operators/test_watcher.py Outdated
Comment thread docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst Outdated
Comment thread cosmos/airflow/task_group.py Outdated
@tatiana tatiana changed the title Prevent DbtTaskGroup watcher producer skip from propagating downstream tasks Prevent watcher producer skip propagating to downstream tasks Apr 22, 2026
Copilot AI review requested due to automatic review settings April 22, 2026 15:27
@tatiana tatiana changed the title Prevent watcher producer skip propagating to downstream tasks Prevent watcher producer skip propagating to downstream tasks via trigger_rule Apr 22, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread tests/operators/test_watcher.py
return result

def __rlshift__(self, other: Any) -> Any:
# other << dbt_group — other is downstream of dbt_group
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__rlshift__ is commented as supporting other << dbt_group (dependency created from the downstream task side), but the docs for propagate_watcher_trigger_rule explicitly call out that task << dbt_group / task.set_upstream(dbt_group) cannot be intercepted. Unless there is a concrete Airflow code path that actually invokes DbtTaskGroup.__rlshift__ for this expression, this method/comment is misleading and may give a false sense that the limitation is handled. Consider removing __rlshift__ (and its test) or updating the comment/docstring to clarify when (if ever) it is invoked.

Suggested change
# other << dbt_group — other is downstream of dbt_group
# Reflected ``<<`` hook used only when Python/Airflow dispatch resolves to this
# TaskGroup instance. This is not a general interception point for
# ``task << dbt_group`` / ``task.set_upstream(dbt_group)``; watcher trigger-rule
# propagation must not rely on this method being invoked for those cases.

Copilot uses AI. Check for mistakes.
Comment on lines +104 to +111
@patch("cosmos.settings.propagate_watcher_trigger_rule", True)
def test_rlshift_sets_trigger_rule():
tg = _make_task_group(ExecutionMode.WATCHER)
task = MagicMock(trigger_rule="all_success")

tg.__rlshift__(task)

assert task.trigger_rule == "none_failed_min_one_success"
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test calls tg.__rlshift__(task) directly, which doesn't reflect the supported user-facing wiring patterns documented for this feature (only dependencies created from the DbtTaskGroup side, e.g. dbt_group >> task / dbt_group.set_downstream(task)). If task << dbt_group is not interceptable in practice, this test may be asserting behavior that users can't rely on. Prefer removing this test or rewriting it to cover only supported patterns.

Suggested change
@patch("cosmos.settings.propagate_watcher_trigger_rule", True)
def test_rlshift_sets_trigger_rule():
tg = _make_task_group(ExecutionMode.WATCHER)
task = MagicMock(trigger_rule="all_success")
tg.__rlshift__(task)
assert task.trigger_rule == "none_failed_min_one_success"

Copilot uses AI. Check for mistakes.
@tatiana
Copy link
Copy Markdown
Collaborator Author

tatiana commented Apr 23, 2026

Closed as per discussion: #2594

tatiana added a commit that referenced this pull request Apr 23, 2026
…eway task (#2597)

Co-authored-by: John Horan <jhoran@zendesk.com>

When using `ExecutionMode.WATCHER` with `DbtTaskGroup`, the producer
task may be skipped on retry (via `AirflowSkipException`) since
#2559. Because the
producer is a leaf task inside the `TaskGroup`, Airflow's default
`trigger_rule="all_success"` causes any tasks downstream of the group to
be skipped as well - even when all consumer tasks succeeded. **This
makes `ExecutionMode.WATCHER` behave differently from
`ExecutionMode.LOCAL`** when used with `DbtTaskGroup`. This issue does
not happen in `DbtDag`.

This PR introduces a downstream gateway task
(`dbt_producer_watcher_done`) for the producer when in watcher mode.
This ensures the producer's skip state does not propagate outside the
task group, since the gateway task has `trigger_rule=none_failed`.

This solution was originally proposed by @johnhoran in
#2430. The code has
changed significantly since, so we're opening a new PR and adding him as
a co-author. It is an alternative approach to
#2591.

**Behaviour before and after**

The newly introduced DAG `example_watcher_downstream_not_skipped`
illustrates the problematic behaviour this PR aims to address.

Video before this change:


https://github.com/user-attachments/assets/3cd03975-3c97-44c5-bf6f-8f56796815a9

Video after this change:


https://github.com/user-attachments/assets/3a50b882-9ed0-4d16-b5ae-662bec76b4e3

**Alternative approaches**

- Cosmos changing the `trigger_rule` of the downstream tasks as
implemented in #2591
- Accept that `ExecutionMode.LOCAL` and `ExecutionMode.WATCHER` are not
fully compatible and document that users should set
`trigger_rule="none_failed_min_one_success"` manually

**Implications**

- The DAG topology is changed to include a new `EmptyOperator` task
(`dbt_producer_watcher_done`)

**Unsolved problem: `depends_on_past`**

This PR does not solve the problem:
#2596.

PR #2430 also doesn't solve the `depends_on_past` race condition. The PR
fixes the retry/skip mechanism (backup XCom, restore on retry, skip
producer on second attempt), but it doesn't prevent Run 2's producer
from starting while Run 1's consumers are still running their fallback
dbt commands.

Closes #2594
pankajkoti pushed a commit that referenced this pull request Apr 23, 2026
…eway task (#2597)

Co-authored-by: John Horan <jhoran@zendesk.com>

When using `ExecutionMode.WATCHER` with `DbtTaskGroup`, the producer
task may be skipped on retry (via `AirflowSkipException`) since
#2559. Because the
producer is a leaf task inside the `TaskGroup`, Airflow's default
`trigger_rule="all_success"` causes any tasks downstream of the group to
be skipped as well - even when all consumer tasks succeeded. **This
makes `ExecutionMode.WATCHER` behave differently from
`ExecutionMode.LOCAL`** when used with `DbtTaskGroup`. This issue does
not happen in `DbtDag`.

This PR introduces a downstream gateway task
(`dbt_producer_watcher_done`) for the producer when in watcher mode.
This ensures the producer's skip state does not propagate outside the
task group, since the gateway task has `trigger_rule=none_failed`.

This solution was originally proposed by @johnhoran in
#2430. The code has
changed significantly since, so we're opening a new PR and adding him as
a co-author. It is an alternative approach to
#2591.

**Behaviour before and after**

The newly introduced DAG `example_watcher_downstream_not_skipped`
illustrates the problematic behaviour this PR aims to address.

Video before this change:

https://github.com/user-attachments/assets/3cd03975-3c97-44c5-bf6f-8f56796815a9

Video after this change:

https://github.com/user-attachments/assets/3a50b882-9ed0-4d16-b5ae-662bec76b4e3

**Alternative approaches**

- Cosmos changing the `trigger_rule` of the downstream tasks as
implemented in #2591
- Accept that `ExecutionMode.LOCAL` and `ExecutionMode.WATCHER` are not
fully compatible and document that users should set
`trigger_rule="none_failed_min_one_success"` manually

**Implications**

- The DAG topology is changed to include a new `EmptyOperator` task
(`dbt_producer_watcher_done`)

**Unsolved problem: `depends_on_past`**

This PR does not solve the problem:
#2596.

PR #2430 also doesn't solve the `depends_on_past` race condition. The PR
fixes the retry/skip mechanism (backup XCom, restore on retry, skip
producer on second attempt), but it doesn't prevent Run 2's producer
from starting while Run 1's consumers are still running their fallback
dbt commands.

Closes #2594

(cherry picked from commit 3a138fd)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

DbtTaskGroup watcher: producer skip propagates to downstream tasks

2 participants